package com.zhao.apitest.window;

import com.zhao.apitest.beans.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.swing.text.html.HTMLDocument;

/**
 * @author xiaoZhao
 * @date 2022/5/23
 * @describe
 */
public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);

        //DataStreamSource<String> inputStream = env.readTextFile("sensor.txt");

        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);


        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        //开窗测试 指定窗口分配器
        DataStream<Integer> resultStream = dataStream.keyBy("id")
                //设置一个15秒的一个滚动窗口
                //.window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
                //会话窗口
                //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
                //滑动计数窗口
                //.countWindow(10,20)
                //对窗口进行聚合操作 增量窗口操作
                .timeWindow(Time.seconds(15))
                .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
                    @Override
                    //创建累加器
                    public Integer createAccumulator() {
                        return 0;
                    }

                    @Override
                    public Integer add(SensorReading sensorReading, Integer accumulator) {
                        return accumulator+1;
                    }

                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Integer merge(Integer integer, Integer acc1) {
                        return null;
                    }
                });

                //全量窗口函数
        DataStream<Tuple3<String,Long,Integer>> resultStream2 = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
                        String id =tuple.getField(0);
                        Long windowEnd =window.getEnd();

                        Integer count = IteratorUtils.toList(input.iterator()).size();
                        out.collect(new Tuple3<>(id,windowEnd,count));
                    }
                });


        //resultStream.print();
        resultStream2.print();

        env.execute();
    }
}
